ClickHouse Kafka 引擎教程 |
您所在的位置:网站首页 › clickhouse 使用csv建表 › ClickHouse Kafka 引擎教程 |
现在让我们在 Kafka 上设置一个主题,我们可以使用它来加载消息。登录到 Kafka 服务器,然后使用以下示例中的命令创建主题。在此示例中,“kafka”是服务器的 DNS 名称。如果您有其他 DNS 名称,请改用该名称。您还可以调整分区数以及复制因子。 kafka-topics \ --bootstrap-server kafka:9092 \ --topic readings \ --create --partitions 6 \ --replication-factor 2检查主题是否已成功创建。 kafka-topics --bootstrap-server kafka:9092 --describe readings 你将看到如下所示的输出,其中显示了其分区的主题和当前状态。 Topic: readings PartitionCount: 6 ReplicationFactor: 2 Configs: Topic: readings Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: readings Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: readings Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: readings Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: readings Partition: 4 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: readings Partition: 5 Leader: 1 Replicas: 1,2 Isr: 1,2现在 Kafak 准备工作已完成。让我们转向ClickHouse。 ClickHouse Kafka 引擎设置要将数据从 Kafka 主题读取到 ClickHouse 表,我们需要做三件事: 一个目标 MergeTree 表,用于为引入的数据提供主目录 一个 Kafka 引擎表,使主题看起来像一个 ClickHouse 表 用于自动将数据从 Kafka 移动到目标表的具体化视图 首先,我们将定义目标 MergeTree 表。登录到 ClickHouse 执行以下 SQL CREATE TABLE readings ( readings_id Int32 Codec(DoubleDelta, LZ4), time DateTime Codec(DoubleDelta, LZ4), date ALIAS toDate(time), temperature Decimal(5,2) Codec(T64, LZ4) ) Engine = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (readings_id, time);接下来,我们需要使用 Kafka 引擎创建一个表来连接主题并读取数据。引擎将使用主题“readings”和消费者组名称“readings consumer_group1”从主机 kafka 的代理读取数据。输入格式为 CSV。 请注意,我们省略了“date”列。它是目标表中的别名,将从“time”列自动填充。 CREATE TABLE readings_queue ( readings_id Int32, time DateTime, temperature Decimal(5,2) ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka-headless.kafka:9092', kafka_topic_list = 'readings', kafka_group_name = 'readings_consumer_group1', kafka_format = 'CSV', kafka_max_block_size = 1048576;前面的设置处理最简单的情况:单个代理、单个主题且没有专用配置。 最后,我们创建一个物化视图,用于在 Kafka 和合并树表之间传输数据。 CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS SELECT readings_id, time, temperature FROM readings_queue;这就是 Kafka 到 ClickHouse 的集成。让我们来测试一下。 加载数据现在是时候使用 kafka-console-producer 命令加载一些输入数据了。以下示例使用 CSV 格式添加三条记录。 kafka-console-producer --broker-list kafka:9092 --topic readings |
今日新闻 |
点击排行 |
|
推荐新闻 |
图片新闻 |
|
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭 |